package custom;

import beans.SensorReading;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

/**
 * 自定义一个周期性的时间戳抽取
 *
 * @author lvbingbing
 * @date 2022-01-03 19:23
 */
public class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading> {

    /**
     * 延时为 1分钟
     */
    private Long bound = 60 * 1000L;

    /**
     * 观察到的最大时间戳
     */
    private Long maxTs = Long.MIN_VALUE;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(maxTs - bound);
    }

    @Override
    public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
        maxTs = Math.max(maxTs, element.getTimestamp());
        return element.getTimestamp();
    }
}
